import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import org.apache.flink.util.Collector;

import java.io.File;

public class WindowApply
{

    public static void main(String[] args) throws Exception
    {


        //创建flink流执行的环境，获取环境对象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //添加一个输入流，这里是让程序监控本机9999端口，可以在本机安装nc程序，然后在控制台执行nc -lk 9999
        DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                out.collect(Tuple2.of(value.split(",")[0], Integer.valueOf(value.split(",")[1])));
            }
        });


        //操作keyed类型的Windows  求key相同时的sum
        KeyedStream<Tuple2<String, Integer>, Tuple> keyBy = map.keyBy(0);
        keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .apply(new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception {
                        int sum = 0;
                        for (Tuple2<String, Integer> t: values) {
                            sum += t.f1;
                        }
                        out.collect(sum);
                    }
                }).print();

        keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .process(new ProcessWindowFunction<Tuple2<String,Integer>, Integer, Tuple, TimeWindow>() {
                    @Override
                    public void process(Tuple tuple, Context context, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception {
                        int sum = 0;
                        for (Tuple2<String, Integer> t: values) {
                            sum += t.f1;
                        }
                        out.collect(sum);
                    }
                }).print();

        //操作none keyed类型的window 求整个窗口的所有key的sum
        map.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .apply(new AllWindowFunction<Tuple2<String,Integer>, Integer, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception {
                        int sum = 0;
                        for (Tuple2<String, Integer> t: values) {
                            sum += t.f1;
                        }
                        out.collect(sum);
                    }
                }).print();
        map.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .process(new ProcessAllWindowFunction<Tuple2<String,Integer>, Integer, TimeWindow>() {
                    @Override
                    public void process(Context context, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception {
                        int sum = 0;
                        for (Tuple2<String, Integer> t: values) {
                            sum += t.f1;
                        }
                        out.collect(sum);
                    }
                }).print();

        env.execute();





    }



}


//代码来自:
//https://blog.csdn.net/zhaocuit/article/details/106588577